Conversation
sagehen03
commented
Jan 9, 2026
- adding enrichr end point
- [Multiquery] Allow multiple queries in a single call
- Fix so that it doesn't connect to S3 multiple times
- remove combine with method
- [Multiquery] Initial narrow use concurrent multiquery
- remove print
d3a8574 to
36f65cb
Compare
sagehen03
left a comment
There was a problem hiding this comment.
I have 3 comments made out of an abundance or caution.
| # start reading the records on-demand | ||
| self.record_filter = record_filter | ||
| self.records = self._readall() | ||
| if self.bytes_total <= config.response_limit: |
There was a problem hiding this comment.
| if self.bytes_total <= config.response_limit: | |
| if self.bytes_total <= config.response_limit and len(self.sources) > 1: |
I'd vote for only using the new _readparallel method when we have mutliple queries or multiple files to process otherwise we're spinning up a thread pool where we didn't previously.
| A generator that reads each of the records from S3 for the sources. | ||
| """ | ||
| record_map = {} | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool: |
There was a problem hiding this comment.
We're creating a thread pool for each read parallel request here. I think it might be better to have a global pool that we use. It's a small concern, but we'd end up resource starved pretty quickly if we have a handful of requests coming in at once that all create a thread pool. If we had a global pool with say 8-10 threads, then requests could queue waiting for available threads.
|
|
||
| if self.record_filter is None or self.record_filter(record): | ||
| if source.record_filter is None or source.record_filter(record): | ||
| self.count += 1 |
There was a problem hiding this comment.
Now, that we're calling _readsource from multiple threads and using the same RecordSource object I think we have shared state between multiple threads. I think this probably only means some disagreement in the response metadata and the data, but I think we can probably either use synchronization or stack confined variables to fix.